package com.gitee.jastee.kafka.Interceptor;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * 生产者拦截器
 * @Author jast
 * @Date 2020/4/19 下午2:09
 * @Version 1.0
 */
public class JastProducerInterceptor implements ProducerInterceptor<String,String> {

    /**
     * 该方法会在消息发送之前被调用。如果你想在发送之前对消息进行处理，这个方法是你唯一的机会
     * @param producerRecord
     * @return
     */
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
     //   System.out.println(producerRecord.toString());
//        System.out.println("发送之前进来了");
        if(JSONUtil.isJson(producerRecord.value())){
            JSONObject jsonObject = JSONUtil.parseObj(producerRecord.value());
            jsonObject.set("producer_time",System.currentTimeMillis());
            return new ProducerRecord<String,String>(producerRecord.topic(),jsonObject.toJSONString(0));
        }
        return producerRecord;
    }

    /**
     * 该方法会在消息成功提交或发送失败之后被调用
     * 这个方法和 onSend 不是在同一个线程中被调用的，因此如果你在这两个方法中调用了某个共享可变对象，一定要保证线程安全
     * 这个方法处在 Producer 发送的主路径中，所以最好别放一些太重的逻辑进去，否则你会发现你的 Producer TPS 直线下降
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
      //  System.out.println("提交完成或失败进来了");
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
